package kafka

import (
	"code.simon.com/data-agent/common"
	"code.simon.com/data-agent/conf"
	"context"
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"
	"sync"
)

var (
	taskManager taskManage
	once        sync.Once
)

type task struct {
	consumer sarama.Consumer
	producer sarama.SyncProducer
	Topic    string
	ctx      context.Context
	Cancel   context.CancelFunc
}

type taskManage struct {
	msrTask map[string]*task
}

// GetTaskManager  得到kafka的任务管理者-单例模式
func GetTaskManager() *taskManage {
	once.Do(func() {
		taskManager = taskManage{
			make(map[string]*task),
		}
	})
	return &taskManager
}

// InitSource 将kafka作为source初始化
func (taskManage *taskManage) InitSource() {
	// 创建新的消费者
	consumer, cErr := sarama.NewConsumer([]string{conf.ConfigObj.KafkaConfig.SourceIp}, nil)
	common.ManualHandleError("source:Failed to create new kafka consumer", cErr)
	// kafka 作为数据源时，是采用消费者，消费数据
	// 创建task
	tt := task{
		Topic:    conf.ConfigObj.KafkaConfig.SourceTopic,
		ctx:      nil,
		Cancel:   nil,
		consumer: consumer,
	}
	taskManager.msrTask["source"] = &tt
	partitions, pErr := consumer.Partitions(conf.ConfigObj.KafkaConfig.SourceTopic)
	common.ManualHandleError("source:Failed to read the consumer partitions", pErr)
	// 遍历所有分区
	for partition := range partitions {
		// 针对每个分区创建一个对应的分区消费者
		consumePartition, cpErr := consumer.ConsumePartition(conf.ConfigObj.KafkaConfig.SourceTopic, int32(partition), sarama.OffsetNewest)
		common.ManualHandleError("source:Failed to create a corresponding partition consumer for each partition", cpErr)
		// 异步从每个分区消费消息
		go func(partitionConsumer sarama.PartitionConsumer, tt task) {
			defer consumePartition.AsyncClose()
			for msg := range consumePartition.Messages() {
				logrus.Printf("partition:%d Offset: %d key:%s value:%s", msg.Partition, msg.Offset, msg.Key, msg.Value)
				// 发往一个中间层channel
				common.MidChannel <- &common.MidChannelModel{
					Value: &msg.Value,
				}
			}
		}(consumePartition, tt)
	}

}
func (taskManage *taskManage) InitTarget() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和folloew都q确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机选出partition
	config.Producer.Return.Successes = true
	// 连接kafka
	client, err := sarama.NewSyncProducer([]string{conf.ConfigObj.KafkaConfig.SourceIp}, config)
	tt := task{
		Topic:    conf.ConfigObj.KafkaConfig.TargetTopic,
		ctx:      nil,
		Cancel:   nil,
		producer: client,
	}
	// 将客户端存入map中
	taskManager.msrTask["source"] = &tt
	common.ManualHandleError("kafka:producer closed,err", err)
	logrus.Info("kafka:producer success init ")
	// 启动kafka生产者，自动发送消息
	go autoSendMsg(&tt)

}

// AutoSendMsg 从通道中读取消息并发送给kafka
func autoSendMsg(tt *task) {
	logrus.Info("kafka:producer success start ")
	for {
		select {
		case msg := <-common.MidChannel:
			finalTopic := msg.Key
			if len(finalTopic) == 0 {
				finalTopic = conf.ConfigObj.TargetTopic
			}
			message, offset, err := tt.producer.SendMessage(&sarama.ProducerMessage{
				Topic: finalTopic,
				Value: sarama.ByteEncoder(*msg.Value),
			})
			if err != nil {
				logrus.Error("send Message failed, err:", err)
			}
			logrus.Printf("send msg to kafka success:pid:%v offset: %v topic: %v", message, offset, finalTopic)
		}
	}
}

// GetNewMessage 获取一个新的消息
func GetNewMessage(topic string, text string) *sarama.ProducerMessage {
	kafkaMessage := &sarama.ProducerMessage{}
	kafkaMessage.Topic = topic
	kafkaMessage.Value = sarama.StringEncoder(text)
	return kafkaMessage
}
